package com.dahuan.source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        Properties properties = new Properties();
        properties.setProperty( "bootstrap.servers","Bigdata:9092" );
        properties.setProperty( "group.id","consumer-group" );
        DataStreamSource<String> source = env.addSource( new FlinkKafkaConsumer011<String>( "sensor", new SimpleStringSchema(), properties ) );

        source.print();
        env.execute("KafkaSource");


    }
}
